#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>  
#include <errno.h>  
#include <sys/select.h>
#include <sys/types.h> 
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/wait.h>  
#include <sys/prctl.h>

#include <map>

#include "sync.h"
#include "file.h"
#include "strings.h"
#include "log.h"

#define MAX_RCV_BUF 1024 * 1024

volatile int sync_running = 1;

using namespace std;


void handle_signal(int signo) {    
    int status;  
    switch(signo){
        case SIGCHLD:
            while(waitpid(-1, &status, WNOHANG) > 0);  //子进程后，父进程回收子进程信息
            break;
        case SIGHUP:
            log_info("Sync process recv SIGHUP..");
            while(waitpid(-1, &status, WNOHANG) > 0); //如果父进程退出，则等待自己的子进程回收后再设置运行标志
            sync_running = 0;
            break;
        default:
            break;
    }  
}


Sync::Sync(Config *conf, int recv_fd){
    this->conf = conf;
    this->recv_fd = recv_fd;    
    signal(SIGCHLD, handle_signal);  
    init();
}

Sync::~Sync(){
    while (!(sq.empty())) {
        sq.pop();
    }    
    std::map<string, s_info>::iterator itr;
    for(itr = host_map.begin(); itr != host_map.end(); itr++){
        //itr->second = "";
        delete itr->second;
    }    
    host_map.clear();
    log_debug("stopping slave thread...");
    if(!thread_quit){
        stop();
    }
}

void Sync::stop(){
    thread_quit = true;
    void *tret;
    int err = pthread_join(run_thread_tid, &tret);
    if(err != 0){
        log_error("can't join thread: %s", strerror(err));
    }
}

void Sync::init(){    
    Config *cc = (Config *)conf->get("sync.slave");
    if(cc != NULL){
        std::vector<Config *> *mychildren = &cc->children;
        std::vector<Config *>::iterator it;
        for(it = mychildren->begin(); it != mychildren->end(); it++){
                if((*it)->key == "host"){
                        const char *host_buf = (*it)->str();
                        char host[200];
                        memset(host, '\0', 200);
                        memcpy(host, host_buf, strlen(host_buf));                        
                        char *port = strpbrk(host, ":");
                        if( NULL == port ){
                            log_info("    bad slave host   : %s", host);
                            continue;
                        }
                        *(port++) = '\0'; 
                        char *path = strpbrk(port, ":");
                        if(NULL == path){                            
                            log_info("    bad slave path   : %s", host);
                            continue;                        
                        }
                        *(path++) = '\0';
                        log_info("slave host: %s port: %s path: %s", host, port, path);
                        s_info info;
                        info = (s_info)malloc(sizeof(struct slave_info));
                        info->port = str_to_int(port);                        
                        info->path = (char *)malloc(strlen(path) + 1);                        
                        memmove(info->path, path, strlen(path) + 1);
                        //printf("slave path: %s\n", info->path);
                        add_slave_host(trim(host), info);
                        slave_num++;
                }             
        }
        /* 单独开启一个线程处理同步指令 */
        thread_quit = false;
	int err = pthread_create(&run_thread_tid, NULL, &Sync::_run_thread, this);
	if(err != 0){
            log_error("can't create thread: %s", strerror(err));
	}
    }    
}

int Sync::event_check(int fd){
    fd_set rfds;
    FD_ZERO(&rfds);
    FD_SET(fd, &rfds);
    struct timeval t_out;
    t_out = {1, 0};
    return select(FD_SETSIZE, &rfds, NULL, NULL, &t_out);//FD_SETSIZE 会受系统默认值的影响
    //return select(FD_SETSIZE, &rfds, NULL, NULL, NULL);//FD_SETSIZE 会受系统默认值的影响
}

void Sync::run(){
    signal(SIGHUP, handle_signal);
    prctl(PR_SET_PDEATHSIG, SIGHUP);//设置当父进程退出时子进程收到的信号    
    int fd = recv_fd;
    while (sync_running) {        
        int fds = event_check(fd); //select(FD_SETSIZE, &rfds, NULL, NULL, NULL);
        int data_read = 0;
        int read_index = 0;
        char buf[MAX_RCV_BUF];
        memset(buf, '\0', sizeof (buf));
        while (fds) {
            printf("begin recv data...\n");              
            data_read = recv(fd, buf + read_index, sizeof (buf) - read_index - 1, 0);
            if (data_read == -1) {
                printf("reading failed\n");
                break;
            }
            if (data_read == 0) {
                break; //没有读到数据，退出当前循环
            }

            read_index += data_read;
            printf("\ndata read: %d, recv buf:%s\n", data_read, buf);

            if (read_index > 0 && read_index < (int) (sizeof (buf) - 1)) {
                break;
            }
        }

        if (read_index <= 0) {
            //log_info("RCV DATA ERROR.");
            //break; //如果出错或是没有读到数据，就不执行下面程序
        }

        if (buf[strlen(buf) - 1] != '\n') {
            //printf("DATA_ERR\n");
            //continue; //如果接受的数据不完整，退出当前循环
        }

        if(this->slave_num == 0){
            //没有要同步的主机，就退出循环
            continue;
        }
        this->handle_data(buf); //接爱完数据后，再同步处理数据                          

        this->handle_sync();        
    }
    close(fd);
}

void Sync::add_slave_host(string ip, s_info info){
    host_map.insert(std::map<string, s_info>::value_type(ip, info));
}

void Sync::handle_data(char *data){     
    if( (NULL == data) || is_empty_str(data) ){
        return;
    }    
    char *p = NULL;
    char *opt = NULL;
    char *path = NULL;
    int size = strlen(data) + 1;
    p = (char *)malloc(size);    
    memmove(p, data, size);
        
    char *pos = strpbrk(p, " \t");
    
    if(NULL == pos){
        return;
    }
    *(pos++) = '\0';        
    opt = p;  
    
    char *p1 = strpbrk(pos, "\n");    
    if( NULL == p1){
        return;
    }
    *(p1++) = '\0';
    path = pos;     
    if( !is_empty_str(opt) && !is_empty_str(path) ){  
        this->insert_syncq(opt, path);        
        this->handle_data( p1 );
    }    
}

void Sync::handle_sync(){
    while (!(this->sq.empty())) {
            sync_t s = this->sq.front();
            std::string opt = s->opt;
            std::string path = s->path;

            if ( opt == "copy"){
                if (is_dir(path)) {     
                    log_info("copy path: %s", path.c_str());
                    this->sync_dir(path.c_str());
                }else{
                    log_info("copy path failure. [%s] is not dir.", path.c_str());
                }
            } else if (opt == "create") {
                if (is_dir(path)) {     
                    log_info("create path: %s", path.c_str());
                    this->create_dir(path.c_str());
                }
                if (is_file(path)) {
                    log_info("create file: %s", path.c_str());
                    this->update_file(path.c_str());
                }
            } else if (opt == "delete") {
                log_info("remove path: %s", path.c_str());
                this->remove(path.c_str());
            } else if (opt == "modify") {
                if (is_file(s->path)) {
                    log_info("modify file: %s", path.c_str());
                    this->update_file(path.c_str());
                }
            } else if (opt == "attrib") {
                if (is_file(s->path)) {
                    log_info("update file attrib: %s", path.c_str());
                    this->update_attrib(path.c_str());
                }
            } else if (opt == "moved_to") {
                if (is_file(s->path)) {
                    log_info("moved_to file: %s", path.c_str());
                    this->update_file(path.c_str());
                }
            } else {
                log_info("[%s] UNKNOW OPT path: %s", opt.c_str(), path.c_str());
            }
            this->sq.pop();
        }    
}


void Sync::insert_syncq(char *opt, char *path){    
    sync_t s;
    s = (sync_t)malloc( sizeof(struct sync_struct) );
    s->opt = (char *)malloc(strlen(opt) + 1);    
    s->path = (char *)malloc(strlen(path) + 1);    
    memmove(s->opt, opt, strlen(opt) + 1); //使用memmove时不能将 目标 字符内存 重置 memset(dst_buf, src_buf, size) 会导致内存移动出错
    memmove(s->path, path, strlen(path) + 1);//内存中少拷贝一个字节导致处理的数据中出现乱码    
    this->sq.push(s);
    //printf("Insert. %s %s", s->opt, s->path);    
}

std::string Sync::get_sync_path(const char *event_path, const char *slave_path){
    std::string slave_sync_path = "";
    if(NULL == event_path || NULL == slave_path) return slave_sync_path;
    std::string sync_path = conf->get_str("sync.path");
    int buf_size = strlen(slave_path) + strlen(event_path) - sync_path.size() + 1;
    char *buf;
    buf = (char *)malloc(buf_size);
    memset(buf, '\0', buf_size);
    snprintf(buf, buf_size, "%s%s", slave_path, event_path + sync_path.size());
    printf("buf: %s\n", buf);
    slave_sync_path = buf;
    return slave_sync_path;
}

void Sync::create_dir(const char *dir){    
    if( (NULL == dir) ||  !(is_dir(dir)) ) return;    
    std::map<string, s_info>::iterator itr = host_map.begin();
    for(;itr != host_map.end();) {
        if( !is_ip(itr->first.c_str()) ) return;
        std::string srv_str = conf->get_str("sync.user");
        srv_str += "@";
        srv_str += itr->first;
        std::string port = str(itr->second->port);
        std::string slave_path = get_sync_path(dir, itr->second->path); 
        
        std::string ident_file = "/root/.ssh/";        
        ident_file += conf->get_str("sync.user");
        ident_file += "_id_rsa";
        
        itr++;
        pid_t pid = fork();
        if( pid > 0){
            continue;
        }else{
            //printf("/usr/bin/ssh, ssh -p %s %s %s %s %s", port.c_str(), srv_str.c_str(), "mkdir", "-p", dir);
            execl("/usr/bin/ssh", "ssh", "-p", port.c_str(), "-i", ident_file.c_str(), srv_str.c_str(), "mkdir", "-p", slave_path.c_str(), NULL);      
            exit(0);
        }        
    }        
}

void Sync::update_attrib(const char *path){    
    if( (NULL == path) ||  !(is_dir(path) || is_file(path)) ) return;    
    std::map<string, s_info>::iterator itr = host_map.begin();
    for(;itr != host_map.end();) {
        if( !is_ip(itr->first.c_str()) ) return;
        std::string srv_str = conf->get_str("sync.user");
        srv_str += "@";
        srv_str += itr->first;
        std::string port = str(itr->second->port);
        std::string slave_path = get_sync_path(path, itr->second->path);        
        
        std::string ident_file = "/root/.ssh/";        
        ident_file += conf->get_str("sync.user");
        ident_file += "_id_rsa";
        
        itr++;
        pid_t pid = fork();
        if( pid > 0){
            continue;
        }else{
            std::string mod = str(path_mod(path));
            printf("path mod: %d port: %s slave path: %s\n", path_mod(path), port.c_str(), slave_path.c_str()); 
            execl("/usr/bin/ssh", "ssh", "-p", port.c_str(), "-i", ident_file.c_str(), srv_str.c_str(), "chmod", mod.c_str(), slave_path.c_str(), NULL);            
            exit(0);
        }        
    }        
}

void Sync::remove(const char *path){    
    std::map<string, s_info>::iterator itr = host_map.begin();
    for(;itr != host_map.end();) { 
        if( !is_ip(itr->first.c_str()) ) return;
        std::string ip = itr->first;
        std::string srv_str = conf->get_str("sync.user");
        srv_str += "@";
        srv_str += ip;
        std::string port = str(itr->second->port);
        std::string slave_path = get_sync_path(path, itr->second->path);    
        
        std::string ident_file = "/root/.ssh/";        
        ident_file += conf->get_str("sync.user");
        ident_file += "_id_rsa";
        
        itr++;
              
        pid_t pid = fork();
        if( pid > 0){            
            continue;
        }else{ 
            execl("/usr/bin/ssh", "ssh", "-p", port.c_str(), "-i", ident_file.c_str(), srv_str.c_str(), "rm", "-rf", slave_path.c_str(), NULL); 
            exit(0);         
        }        
    }
}

void Sync::sync_dir(const char *ip){
    std::string host = ip;
    if( !is_ip(host.c_str()) ) return;
    std::string dir = conf->get_str("sync.path");
    if( !(is_dir(dir)) ) return;    
    if( dir[dir.size()-1] != '/' ){
        dir += '/';
    }
    printf("sync dir: %s\n", dir.c_str());
    std::string srv_str = conf->get_str("sync.user");
    srv_str += "@";
    std::map<string, s_info>::iterator itr = host_map.begin();
    for(;itr != host_map.end();) {
        //printf("BB host: %s slave: %s\n", host.c_str(), itr->first.c_str());
        if( !is_ip(itr->first.c_str()) ||  itr->first != host ) {
            itr++;
            continue; //只同步指定的主机
        }        
        std::string slave_path = get_sync_path(dir.c_str(), itr->second->path); 
        srv_str += itr->first;
        srv_str += ":";
        srv_str += slave_path;
        std::string port = str(itr->second->port);             
        
        std::string ident_file = "/root/.ssh/";        
        ident_file += conf->get_str("sync.user");
        ident_file += "_id_rsa";
        
        std::string ssh_params = "--rsh=ssh ";
        ssh_params += "-p ";
        ssh_params += port;
        ssh_params += " -i ";
        ssh_params += ident_file;        
                
        itr++;
        pid_t pid = fork();
        if( pid > 0){
            continue;
        }else{
            //rsync -v -u -a --delete --rsh=ssh --stats localfile.txt username@remote_ip:/home/username/
            //printf("rsync -v -u -a --dirs --delete %s -i %s --stats %s %s\n", port.c_str() , ssh_params.c_str(), dir.c_str(), srv_str.c_str());
            execl("/usr/bin/rsync", "rsync", "-v", "-u", "-a", "--dirs", "--delete", ssh_params.c_str(), "--stats", dir.c_str(), srv_str.c_str(), NULL);              
            exit(0);
        }        
    }
}

void Sync::update_file(const char *file){
    if( !(is_file(file)) ) return;    
    char *path;
    int f_size = strlen(file) + 1;
    path = (char *)malloc(f_size);
    memmove(path, file, f_size);
    char *p = strrchr(path, '/');
    if(NULL == p){
        return;
    }
    *(++p) = '\0';             
    std::map<string, s_info>::iterator itr = host_map.begin();
    for( ; itr != host_map.end(); ){
        if( !is_ip(itr->first.c_str()) ) return;
        std::string slave_path = get_sync_path(file, itr->second->path);
        std::string srv_str = conf->get_str("sync.user");
        srv_str += "@";
        srv_str += itr->first;
        //printf("itr->first: %s slave_path: %s\n", itr->first.c_str(), slave_path.c_str());
        srv_str += ":";
        srv_str += slave_path;
        std::string port = str(itr->second->port);
        
        std::string ident_file = "/root/.ssh/";        
        ident_file += conf->get_str("sync.user");
        ident_file += "_id_rsa";
        
        itr++;
        pid_t pid = fork();
        if( pid > 0){
            continue;
        }else{   
            //printf("/usr/bin/scp -P %s -i %s -r -v %s %s\n", port.c_str(), ident_file.c_str(), file, srv_str.c_str());
            execl("/usr/bin/scp", "scp", "-P", port.c_str(), "-i", ident_file.c_str(), "-r", file, srv_str.c_str(), NULL);              
            exit(0);
        }
    } 
}

int Sync::exec_cmdstr(const char *cmdstr){
    FILE *fp = popen(cmdstr, "r");
    if(fp == NULL){
        log_info("execute %s failed.", cmdstr);
        pclose(fp);
        return 0;
    }
    char buf[200] = {0};
    while(NULL != fgets(buf, sizeof(buf), fp)){
        log_info( " %s ", buf);
        if( strpbrk(buf, "Connection timed out") ){
            log_info("Error: Connection timed out.");
            pclose(fp);
            return 0;
        }
        memset(buf, '\0', 200);
    }   
    pclose(fp);
    return 1;
}

void* Sync::_run_thread(void* arg){
    Sync *sync = (Sync *)arg;
    int sock = -1;
    int opt = 1;
    int backlog = 1024;
    
    const std::string ip = sync->conf->get_str("server.ip");
    int port = str_to_int(sync->conf->get_str("server.port"));
    struct sockaddr_in addr;
    bzero(&addr, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons((short)port);
    inet_pton(AF_INET, ip.c_str(), &addr.sin_addr);
        
    if( (sock = ::socket(AF_INET, SOCK_STREAM, 0)) == -1 ){
        fprintf(stdout, "socket 1");
        goto sock_err;
    }
        
    if( ::setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1){
        fprintf(stdout, "socket 2 errno: %s\n", strerror(errno));
        goto sock_err;
    }
    
    if( ::bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1 ){
        fprintf(stdout, "socket 3");
        goto sock_err;
    }   
    
    if( ::listen(sock, backlog) == -1 ){
        fprintf(stdout, "socket 4");
        goto sock_err;
    }
    
    //setnoblocking(sock); //设置非阻塞sock  
    printf("sock = %d\n", sock);
    
    fd_set read_fds;
    fd_set exception_fds;
    FD_ZERO(&read_fds);
    FD_ZERO(&exception_fds);
    
    while(!sync->thread_quit){
        int client_sock;
        struct sockaddr_in client_addr;
        socklen_t client_addrlen = sizeof(client_addr);
        struct linger l_opt = {1, 0};
        int ret;    
        char buf[1024];
        
        while ( (client_sock = ::accept(sock, (struct sockaddr *)&client_addr, &client_addrlen)) == -1 ){            
            if( !(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) ){
                log_error("socket %d accept failed: %s", sock, strerror(errno));
                goto sock_err;
            }
        }
        fprintf(stdout, "client sock: %d\n", client_sock);
        //setnoblocking(client_sock);
        ret = ::setsockopt(client_sock, SOL_SOCKET, SO_LINGER, (void *)&l_opt, sizeof(l_opt));
        if( ret != 0){
            log_error("socket %d set linger failed: %s", client_sock, strerror(errno));
        }        
        
        memset(buf, '\0', sizeof(buf));
        /*每次调用select 前都要重新在read_fds 和 exception_fds 中设置文件描述符connfd, 因为事件发生后， 文件描述符集合将被内核修改*/
        FD_SET(client_sock, &read_fds); 
        FD_SET(client_sock, &exception_fds);
        //struct timeval timeout;
        //timeout = {1, 0};
        ret = select(client_sock + 1, &read_fds, NULL, &exception_fds, NULL);
        //ret = select(client_sock + 1, &read_fds, NULL, &exception_fds, &timeout);
        if(ret < 0){
            log_info("sync client selection failure\n");
            goto sock_err;
        }
        
        /*对于可读事件， 采用普通的recv函数读取数据*/
        if(FD_ISSET(client_sock, &read_fds)){
            while(1){
                ret = recv(client_sock, buf, sizeof(buf) - 1, 0);
                if(ret == 0) break;                  
                if(ret > 0){                                   
                    //close(STDOUT_FILENO);
                    //dup(client_sock);
                    printf("get %d bytes of normal data: %s\n", ret, buf);
                    std::string opt;
                    char *p = strpbrk(buf, " \t");
                    if(NULL != p){
                        *p = '\0';
                        opt = trim(buf);                        
                    }
                    
                    if( opt == "copy"){                        
                        std::string ip = ++p;
                        ip[ip.size() - 1] = '\0'; //把最后一个\n 替换成\0
                        printf("ip: %s\n", ip.c_str());
                        sync->sync_dir(ip.c_str());
                    }else{
                        if(NULL != p){
                           *p = '\t'; 
                        }                        
                        sync->handle_data(buf);
                        sync->handle_sync();
                    }
                    send(client_sock, "ok.\n", 9, 0);
                    memset(buf, '\0', sizeof(buf));                    
                }
                //if( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ){  
                if(ret < 0){
                    if( errno == EINTR){      
                        errno = 0;
                        usleep(100);
                        continue;
                    }    
                    break;
                }
            }
        } else if(FD_ISSET(client_sock, &exception_fds)){/*对于异常数据，采用带MSG_OOB标志的recv函数读取带外数据*/
            while(1){
                ret = recv(client_sock, buf, sizeof(buf) - 1, MSG_OOB);
                if(ret == 0) break;
                if( errno == EINTR ){
                    continue;
                }
                printf("get %d bytes of oob data:%s\n", ret, buf);
                memset(buf, '\0', sizeof(buf));
            }
        }
        close(client_sock);
    }
    
    close(sock);
    
    
    log_info("Sync thread quit");
    return (void *)NULL;
    
sock_err:
        log_fatal("Sync thread exit unexpectedly");
        exit(0);
        return (void *)NULL;
}